Note: this notebook should be launched under pyspark
directed to IPython notebook as python interpreter (see https://spark.apache.org/docs/0.9.1/python-programming-guide.html).
In [2]:
%matplotlib inline
import pyspark
import matplotlib.pyplot as plt
import numpy as np
import scipy
import re
In [3]:
xs = range(5)
In [4]:
map(lambda x: x*x, xs)
Out[4]:
In [5]:
reduce(lambda acc, x: acc / (x + 1), xs, 1.0)
Out[5]:
In [6]:
filter(lambda x: x % 3 == 1, xs)
Out[6]:
In [7]:
from pyspark import SparkContext
In [8]:
sc = SparkContext()
In [9]:
nums = sc.parallelize(xrange(0, 25)).persist()
text = sc.textFile("file:///root/notebook/Sherlock.txt").persist()
In [10]:
print nums.map(lambda x: x + 1).collect()
In [11]:
words = text.flatMap(lambda x: re.findall("[a-zA-z']+", x)).map(lambda x: x.strip().lower()).persist()
words.take(10)
Out[11]:
In [12]:
print nums.filter(lambda x: x % 2 == 0).collect()
In [13]:
print nums.flatMap(lambda x: [x] if x % 2 == 0 else []).collect()
In [14]:
print nums.aggregate("", lambda acc, x: acc + str(x), lambda acc1, acc2: acc1 + " " + acc2)
In [15]:
print nums.repartition(1).aggregate("", lambda acc, x: acc + str(x), lambda acc1, acc2: acc1 + " " + acc2)
In [16]:
print nums.fold(0, lambda a, b: a + b)
In [17]:
print nums.reduce(lambda a, b: a + b)
In [18]:
print nums.max(), nums.min()
In [19]:
words.map(lambda w: (w, 1)).groupByKey().map(lambda (w, c): (w, sum(list(c)))).take(10)
Out[19]:
In [20]:
count = words.map(lambda w: (w, 1)).reduceByKey(lambda c1, c2: c1 + c2).persist()
count.take(20)
Out[20]:
In [21]:
wordsLen = words.distinct().map(lambda w: (w, len(w))).persist()
wordsLen.take(10)
Out[21]:
In [22]:
count.cogroup(wordsLen).map( lambda (w, (counts, lens)): (w, list(counts), list(lens)) ).take(10)
Out[22]:
In [23]:
count.join(wordsLen).take(10)
Out[23]:
In [24]:
wordCount = sc.textFile("file:///root/notebook/Sherlock.txt").flatMap(
lambda x: re.findall("[a-zA-z']+", x)
).map(
lambda x: x.strip().lower()
).map(
lambda w: (w, 1l)
).reduceByKey(
lambda c1, c2: c1 + c2
).persist()
### Find top N words
def top(N):
def reducer(wc1, wc2):
wc = wc1 + wc2
return sorted(wc, key = lambda wc: -wc[1])[:N]
return reducer
top10 = wordCount.map(lambda wc: [wc]).reduce(top(10))
for word, c in top10:
print word, "\t" , c
In [25]:
top250 = wordCount.map(lambda wc: [wc]).reduce(top(250))
plt.bar(range(250), sorted([ c for w, c in top250 ], reverse=True)[:250], color="green", edgecolor="green")
Out[25]:
In [26]:
freqByLen = sorted (
wordCount.map(
lambda (w, c): (len(w), c)
).reduceByKey(
lambda c1, c2: c1 + c2
).collect(),
key = lambda (l, c): l
)
In [27]:
plt.bar(range(len(freqByLen)), [ c for (l, c) in freqByLen], color="red")
Out[27]:
In [28]:
uniqsByLen = list()
wordsN = words.count()
samples = 10
fractions = [ f for i in range(1, 101) for f in [i / 100.0]]
counts = list()
uniqs = list()
for fraction in fractions:
sample = words.sample(withReplacement=False, fraction=fraction).persist()
uniq = sample.countApproxDistinct()
count = sample.count()
uniqs.append(uniq)
counts.append(count)
sample.unpersist()
In [29]:
plt.figure()
plt.plot(counts, uniqs, ".")
plt.show()
In [30]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithSGD
import pandas as pd
In [31]:
from math import isnan
from operator import or_
def parsePoints(df):
data = list()
for i in xrange(len(df)):
survived = float(df["Survived"][i])
age = float(df["Age"][i])
pClass = float(df["Pclass"][i])
nameL = float(len(df["Name"][i]))
sex = 1.0 if df["Sex"][i] == "male" else 0.0
sibSp = float(df["SibSp"][i])
parch = float(df["Parch"][i])
fare = float(df["Fare"][i])
features = [age, pClass, nameL, sex, sibSp, parch, fare]
if reduce(or_, map(isnan, features)):
continue
p = LabeledPoint(survived, features)
data.append(p)
return data
df = pd.read_csv("./train.csv")
data = parsePoints(df)
print "N:", len(data)
train, test = sc.parallelize(data).randomSplit([0.5, 0.5], seed=2l)
print "Train positive:", train.filter(lambda p: p.label == 1.0).count()
print "Train negative:", train.filter(lambda p: p.label == 0.0).count()
print "Test positive:", test.filter(lambda p: p.label == 1.0).count()
print "Test negative:", test.filter(lambda p: p.label == 0.0).count()
In [32]:
logisticM = LogisticRegressionWithSGD.train(train, 250)
logisticM
Out[32]:
In [33]:
validation = test.map(
lambda p: (int(round(p.label)), logisticM.predict(p.features))
).persist()
validation.take(10)
Out[33]:
In [34]:
total = float(validation.count())
tp = validation.filter(lambda (real, pred): real == 1 and pred == 1).count() / total
fp = validation.filter(lambda (real, pred): real == 0 and pred == 1).count() / total
tn = validation.filter(lambda (real, pred): real == 0 and pred == 0).count() / total
fn = validation.filter(lambda (real, pred): real == 1 and pred == 0).count() / total
print "tp:", "%.2f" % tp
print "fp:", "%.2f" % fp
print "tn:", "%.2f" % tn
print "fn:", "%.2f" % fn
In [35]:
def logistic(w):
from math import exp
def predict(x):
z = w.dot(x)
return 1 / (1 + exp(-z))
return predict
predict_proba = logistic(logisticM.weights)
predicted_proba = test.map (
lambda p: (p.label, predict_proba(p.features))
).collect()
In [36]:
from sklearn.metrics import roc_curve, auc
# Compute ROC curve and area the curve
fpr, tpr, thresholds = roc_curve([ r for r, p in predicted_proba ], [ p for r, p in predicted_proba ])
roc_auc = auc(fpr, tpr)
# Plot ROC curve
fig = plt.figure(figsize=(7,5))
ax = fig.add_subplot(111)
plt.plot(fpr, tpr, color='lightblue', lw=2, label='ROC curve')
plt.plot([0, 1], [0, 1], color='black', lw=2, linestyle='dotted', label='random guessing')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.0])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver operating characteristic (ROC)')
plt.legend(loc="lower right")
ax.annotate('AUC = %0.2f' %roc_auc, xy=(0.35, 0.6))
plt.show()
In [37]:
sc.version
Out[37]:
In [38]:
from pyspark.mllib.tree import GradientBoostedTrees
GBTModel = GradientBoostedTrees.trainRegressor(train, dict(), numIterations=20, maxDepth=3)
print GBTModel
testGBT = [ (p.label, GBTModel.predict(p.features)) for p in test.collect()]
In [39]:
from sklearn.metrics import roc_curve, auc
# Compute ROC curve and area the curve
fpr, tpr, thresholds = roc_curve([ r for r, p in testGBT ], [ p for r, p in testGBT ])
roc_auc = auc(fpr, tpr)
# Plot ROC curve
fig = plt.figure(figsize=(7,5))
ax = fig.add_subplot(111)
plt.plot(fpr, tpr, color='lightblue', lw=2, label='ROC curve')
plt.plot([0, 1], [0, 1], color='black', lw=2, linestyle='dotted', label='random guessing')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.0])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver operating characteristic (ROC)')
plt.legend(loc="lower right")
ax.annotate('AUC = %0.2f' %roc_auc, xy=(0.35, 0.6))
plt.show()
In [ ]: